package pulsar

import (
	"context"
	"fmt"
	"strings"
	"time"

	"gitee.com/xfrm/middleware/xconfig"
	"gitee.com/xfrm/middleware/xcontext"
	"gitee.com/xfrm/middleware/xlog"

	"github.com/apache/pulsar-client-go/pulsar"
)

const mqNamespace = "mq"

type roleType int

const (
	producerRoleType roleType = iota
	consumerRoleType

	apolloConfigSep = "."
	defaultGroup    = "default"
)

type ProducerConfig struct {
	*pulsar.ProducerOptions
	Broker            string
	OperationTimeout  time.Duration
	ConnectionTimeout time.Duration
}

type ConsumerConfig struct {
	*pulsar.ConsumerOptions
	Broker            string
	BrokerAdmin       string
	OperationTimeout  time.Duration
	ConnectionTimeout time.Duration
}

// item key
const (
	// producer
	maxPendingMessagesKey        = "max_pending_msg"
	hashSchemaKey                = "hash_schema"
	compressionTypeKey           = "compression_type"
	compressionLevelKey          = "compression_level"
	disableBatchingKey           = "disable_batch"
	batchingMaxPublishDelayMSKey = "batching_max_publish_delay_ms"
	batchingMaxMessagesKey       = "batching_max_msg"
	batchingMaxSizeKey           = "batching_max_size"

	// consumer
	subTypeKey                     = "type"
	subscriptionInitialPositionKey = "sub_init_position"
	receiverQueueSizeKey           = "receiver_queue_size"

	brokerKey           = "broker"
	brokerAdminKey      = "broker_admin"
	operationTimeoutKey = "operation_timeout"
	sendTimeoutKey      = "send_timeout"
)

// default value
const (
	// producer
	defaultBatchingMaxPublishDelay = 10
	defaultBatchingMaxMessages     = 1000
	// defaultBatchingMaxSize init default for maximum number of bytes per batch
	defaultBatchingMaxSize    = 128 * 1024
	defaultOperationTimeoutMS = 30000
	defaultSendTimeoutMS      = 3000

	// consumer
	defaultSubType           = 2
	defaultReceiverQueueSize = 1000
)

type apolloMQKey struct {
	topic string
	role  roleType
	lane  string
	item  string
}

func (t roleType) String() string {
	switch t {
	case producerRoleType:
		return "producer"
	case consumerRoleType:
		return "consumer"
	}
	// unreachable
	return ""
}

func getProducerConfig(ctx context.Context, topic string, center xconfig.ConfigCenter) (*ProducerConfig, error) {
	fun := "getProducerConfig -->"

	broker, ok := getConfigStringItemWithFallback(ctx, center, topic, brokerKey, producerRoleType)
	if !ok {
		return nil, fmt.Errorf("%s get broker not found, topic: %s", fun, topic)
	}

	operationTimeout, ok := getConfigIntItemWithFallback(ctx, center, topic, operationTimeoutKey, producerRoleType)
	if !ok {
		operationTimeout = defaultOperationTimeoutMS
		xlog.Debugf(ctx, "%s no operationTimeout config founds, topic: %s", fun, topic)
	}

	maxPendingMessages, ok := getConfigIntItemWithFallback(ctx, center, topic, maxPendingMessagesKey, producerRoleType)
	if !ok {
		xlog.Debugf(ctx, "%s no maxPendingMessages config founds, topic: %s", fun, topic)
	}
	xlog.Debugf(ctx, "%s maxPendingMessages, topic: %s, value: %d", fun, topic, maxPendingMessages)

	hashingSchema, ok := getConfigIntItemWithFallback(ctx, center, topic, hashSchemaKey, producerRoleType)
	if !ok {
		xlog.Debugf(ctx, "%s no hashingSchema config founds, topic: %s", fun, topic)
	}
	xlog.Debugf(ctx, "%s hashingSchema, topic: %s, value: %d", fun, topic, hashingSchema)

	compressionType, ok := getConfigIntItemWithFallback(ctx, center, topic, compressionTypeKey, producerRoleType)
	if !ok {
		xlog.Debugf(ctx, "%s no compressionType config founds, topic:%s", fun, topic)
	}
	xlog.Debugf(ctx, "%s compressionType, topic: %s, value: %d", fun, topic, compressionType)

	compressionLevel, ok := getConfigIntItemWithFallback(ctx, center, topic, compressionLevelKey, producerRoleType)
	if !ok {
		xlog.Debugf(ctx, "%s no compressionLevel config founds, topic: %s", fun, topic)
	}
	xlog.Debugf(ctx, "%s compressionLevel, topic:%s, value: %d", fun, topic, compressionLevel)

	disableBatching, ok := getConfigBoolItemWithFallback(ctx, center, topic, disableBatchingKey, producerRoleType)
	if !ok {
		xlog.Debugf(ctx, "%s no disableBatching config founds topic: %s", fun, topic)
	}
	xlog.Debugf(ctx, "%s disableBatching, topic: %s, value: %t", fun, topic, disableBatching)

	batchingMaxPublishDelayVal, ok := getConfigIntItemWithFallback(ctx, center, topic, batchingMaxPublishDelayMSKey, producerRoleType)
	if !ok {
		batchingMaxPublishDelayVal = defaultBatchingMaxPublishDelay
		xlog.Debugf(ctx, "%s no batchingMaxPublishDelayVal config topic: %s", fun, topic)
	}
	xlog.Debugf(ctx, "%s batchingMaxPublishDelayVal, topic: %s, value: %d", fun, topic, batchingMaxPublishDelayVal)

	batchingMaxMessages, ok := getConfigIntItemWithFallback(ctx, center, topic, batchingMaxMessagesKey, producerRoleType)
	if !ok {
		batchingMaxMessages = defaultBatchingMaxMessages
		xlog.Debugf(ctx, "%s no batchingMaxMessages config topic: %s", fun, topic)
	}
	xlog.Debugf(ctx, "%s batchingMaxMessages, topic: %s, value: %d", fun, topic, batchingMaxMessages)

	batchingMaxSize, ok := getConfigIntItemWithFallback(ctx, center, topic, batchingMaxSizeKey, producerRoleType)
	if !ok {
		batchingMaxSize = defaultBatchingMaxSize
		xlog.Debugf(ctx, "%s no batchingMaxSize config topic: %s", fun, topic)
	}
	xlog.Debugf(ctx, "%s batchingMaxSize, topic: %s, value: %d", fun, topic, batchingMaxSize)

	sendTimeout, ok := getConfigIntItemWithFallback(ctx, center, topic, sendTimeoutKey, producerRoleType)
	if !ok {
		sendTimeout = defaultSendTimeoutMS
		xlog.Debugf(ctx, "%s no sendTimeout config topic: %s", fun, topic)
	}
	xlog.Debugf(ctx, "%s sendTimeout, topic: %s, value: %d", fun, topic, sendTimeout)

	return &ProducerConfig{
		ProducerOptions: &pulsar.ProducerOptions{
			Topic:                   topic,
			MaxPendingMessages:      maxPendingMessages,
			HashingScheme:           pulsar.HashingScheme(hashingSchema),
			CompressionType:         pulsar.CompressionType(compressionType),
			CompressionLevel:        pulsar.CompressionLevel(compressionLevel),
			DisableBatching:         disableBatching,
			BatchingMaxPublishDelay: time.Duration(batchingMaxPublishDelayVal) * time.Millisecond,
			BatchingMaxMessages:     uint(batchingMaxMessages),
			BatchingMaxSize:         uint(batchingMaxSize),
			Name:                    getName(),
			SendTimeout:             time.Duration(sendTimeout) * time.Millisecond,
		},
		Broker:           broker,
		OperationTimeout: time.Duration(operationTimeout) * time.Millisecond,
	}, nil
}

func getConsumerConfig(ctx context.Context, topic string, subscriptionName string, center xconfig.ConfigCenter) (*ConsumerConfig, error) {
	fun := "getConsumerConfig -->"

	broker, ok := getConfigStringItemWithFallback(ctx, center, topic, brokerKey, consumerRoleType)
	if !ok {
		return nil, fmt.Errorf("%s get broker not found, topic: %s", fun, topic)
	}

	brokerAdmin, ok := getConfigStringItemWithFallback(ctx, center, topic, brokerAdminKey, consumerRoleType)
	if !ok {
		xlog.Debugf(ctx, "%s no subscription brokerAdmin config found, topic: %s", fun, topic)
	}

	subType, ok := getConfigIntItemWithFallback(ctx, center, topic, subTypeKey, consumerRoleType)
	if !ok {
		subType = defaultSubType
		xlog.Debugf(ctx, "%s no subscription type config found, topic: %s", fun, topic)
	}

	subscriptionInitialPosition, ok := getConfigIntItemWithFallback(ctx, center, topic, subscriptionInitialPositionKey, consumerRoleType)
	if !ok {
		xlog.Debugf(ctx, "%s no subscription init position config found, topic: %s", fun, topic)
	}

	receiverQueueSize, ok := getConfigIntItemWithFallback(ctx, center, topic, receiverQueueSizeKey, consumerRoleType)
	if !ok {
		receiverQueueSize = defaultReceiverQueueSize
		xlog.Debugf(ctx, "%s no receiver queue size config found, topic: %s", fun, topic)
	}

	operationTimeout, ok := getConfigIntItemWithFallback(ctx, center, topic, operationTimeoutKey, consumerRoleType)
	if !ok {
		operationTimeout = defaultOperationTimeoutMS
		xlog.Debugf(ctx, "%s no operationTimeout config founds, topic: %s", fun, topic)
	}
	return &ConsumerConfig{
		ConsumerOptions: &pulsar.ConsumerOptions{
			Topic:                       topic,
			Type:                        pulsar.SubscriptionType(subType),
			SubscriptionInitialPosition: pulsar.SubscriptionInitialPosition(subscriptionInitialPosition),
			ReceiverQueueSize:           receiverQueueSize,
			SubscriptionName:            subscriptionName,
			Name:                        getName(),
		},
		Broker:           broker,
		BrokerAdmin:      brokerAdmin,
		OperationTimeout: time.Duration(operationTimeout) * time.Millisecond,
	}, nil
}

// key: topic.role.lane.item
func buildTopicConfigKey(ctx context.Context, topic, item string, role roleType) string {
	return strings.Join([]string{
		topic,
		role.String(),
		xcontext.GetControlRouteGroupWithMasterName(ctx, defaultGroup),
		item,
	}, apolloConfigSep)
}

func parseTopicConfigKey(key string) (*apolloMQKey, error) {
	fun := "parseTopicConfigKey"
	parts := strings.Split(key, apolloConfigSep)
	numParts := len(parts)
	if numParts != 4 {
		return nil, fmt.Errorf("%s invalid key:%s", fun, key)
	}

	role, err := mqRoleTypeFromString(parts[1])
	if err != nil {
		return nil, fmt.Errorf("%s mqRoleTypeFromInt err: %v", fun, err)
	}

	lane := parts[2]
	if lane == defaultGroup {
		lane = ""
	}

	return &apolloMQKey{
		topic: parts[0],
		role:  role,
		lane:  lane,
		item:  parts[3],
	}, nil
}

func getConfigStringItemWithFallback(ctx context.Context, center xconfig.ConfigCenter, key string, name string, role roleType) (string, bool) {
	val, ok := center.GetStringWithNamespace(ctx, mqNamespace, buildTopicConfigKey(ctx, key, name, role))
	if !ok {
		defaultCtx := context.WithValue(ctx, xcontext.ContextKeyControl, simpleContextControlRouter{defaultGroup})
		val, ok = center.GetStringWithNamespace(defaultCtx, mqNamespace, buildTopicConfigKey(defaultCtx, key, name, role))
	}
	return val, ok
}

func getConfigIntItemWithFallback(ctx context.Context, center xconfig.ConfigCenter, key string, name string, role roleType) (int, bool) {
	val, ok := center.GetIntWithNamespace(ctx, mqNamespace, buildTopicConfigKey(ctx, key, name, role))
	if !ok {
		defaultCtx := context.WithValue(ctx, xcontext.ContextKeyControl, simpleContextControlRouter{defaultGroup})
		val, ok = center.GetIntWithNamespace(defaultCtx, mqNamespace, buildTopicConfigKey(defaultCtx, key, name, role))
	}
	return val, ok
}

func getConfigBoolItemWithFallback(ctx context.Context, center xconfig.ConfigCenter, key string, name string, role roleType) (bool, bool) {
	val, ok := center.GetBoolWithNamespace(ctx, mqNamespace, buildTopicConfigKey(ctx, key, name, role))
	if !ok {
		defaultCtx := context.WithValue(ctx, xcontext.ContextKeyControl, simpleContextControlRouter{defaultGroup})
		val, ok = center.GetBoolWithNamespace(defaultCtx, mqNamespace, buildTopicConfigKey(defaultCtx, key, name, role))
	}
	return val, ok
}

type simpleContextControlRouter struct {
	group string
}

func (s simpleContextControlRouter) GetControlRouteGroup() (string, bool) {
	return s.group, true
}

func (s simpleContextControlRouter) SetControlRouteGroup(group string) error {
	s.group = group
	return nil
}
